/////////////////////////////////////////////////////////////////////////////
// Original code from libhdfs3. Copyright (c) 2013 - 2014, Pivotal Inc.
// All rights reserved. Author: Zhanwei Wang
/////////////////////////////////////////////////////////////////////////////
//  Modifications by Kumo Inc.
// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//



#include <atomic>
#include <turbo/times/time.h>
#include <kmhdfs/common/exception.h>
#include <kmhdfs/common/exception_internal.h>
#include <kmhdfs/client/file_system_inter.h>
#include <kmhdfs/client/lease_renewer.h>
#include <turbo/log/logging.h>
#include <kmhdfs/client/output_stream.h>
#include <kmhdfs/client/output_stream_impl.h>
#include <kmhdfs/client/packet.h>
#include <kmhdfs/client/packet_header.h>

#include <cassert>
#include <memory>
#include <inttypes.h>

namespace Hdfs {
    namespace Internal {
        OutputStreamImpl::OutputStreamImpl() :
            /*heartBeatStop(true),*/ packets(0), closed(true), isAppend(false), syncBlock(false), checksumSize(0),
                                     chunkSize(
                                         0), chunksPerPacket(0), closeTimeout(0), heartBeatInterval(0), packetSize(0),
                                     position(
                                         0), replication(0), blockSize(0), bytesWritten(0), cursor(0), lastFlushed(
                                         0), nextSeqNo(0) {
            checksum = std::make_shared<Checksum>();

            checksumSize = sizeof(int32_t);
            lastSend = turbo::Time::current_time();
#ifdef MOCK
            stub = NULL;
#endif
        }

        OutputStreamImpl::~OutputStreamImpl() {
            if (!closed) {
                try {
                    close();
                } catch (...) {
                }
            }
        }

        void OutputStreamImpl::checkStatus() {
            if (closed) {
                THROW(HdfsIOException, "OutputStreamImpl: stream is not opened.");
            }

            lock_guard<mutex> lock(mut);

            if (lastError != exception_ptr()) {
                rethrow_exception(lastError);
            }
        }

        void OutputStreamImpl::setError(const exception_ptr &error) {
            try {
                lock_guard<mutex> lock(mut);
                lastError = error;
            } catch (...) {
            }
        }

        /**
         * To create or append a file.
         * @param fs hdfs file system.
         * @param path the file path.
         * @param pair the result of create or append.
         * @param flag creation flag, can be Create, Append or Create|Overwrite.
         * @param permission create a new file with given permission.
         * @param createParent if the parent does not exist, create it.
         * @param replication create a file with given number of replication.
         * @param blockSize  create a file with given block size.
         */
        void OutputStreamImpl::open(std::shared_ptr<FileSystemInter> fs, const char *path,
                                    std::pair<std::shared_ptr<LocatedBlock>, std::shared_ptr<Hdfs::FileStatus> > &pair,
                                    int flag, const Permission &permission, bool createParent, int replication,
                                    int64_t blockSize, int64_t fileId) {
            if (NULL == path || 0 == strlen(path) || replication < 0 || blockSize < 0) {
                THROW(InvalidParameter, "Invalid parameter.");
            }

            if (!(flag == Create || flag == (Create | SyncBlock) || flag == Overwrite
                  || flag == (Overwrite | SyncBlock) || flag == Append
                  || flag == (Append | SyncBlock) || flag == (Create | Overwrite)
                  || flag == (Create | Overwrite | SyncBlock)
                  || flag == (Create | Append)
                  || flag == (Create | Append | SyncBlock))) {
                THROW(InvalidParameter, "Invalid flag.");
            }

            try {
                openInternal(fs, path, pair, flag, permission, createParent, replication,
                             blockSize, fileId);

                do {
                    try {
                        //append
                        if (flag & Append) {
                            initAppend(pair);
                            LeaseRenewer::GetLeaseRenewer().StartRenew(filesystem);
                            break;
                        }
                    } catch (const FileNotFoundException &e) {
                        if (!(flag & Create)) {
                            throw;
                        }
                    }
                    //create
                    assert((flag & Create) || (flag & Overwrite));
                    closed = false;
                    computePacketChunkSize();
                    LeaseRenewer::GetLeaseRenewer().StartRenew(filesystem);
                } while (0);
            } catch (...) {
                reset();
                throw;
            }
        }

        void OutputStreamImpl::computePacketChunkSize() {
            int chunkSizeWithChecksum = chunkSize + checksumSize;
            static const int packetHeaderSize = PacketHeader::GetPkgHeaderSize();
            chunksPerPacket =
                    (packetSize - packetHeaderSize + chunkSizeWithChecksum - 1)
                    / chunkSizeWithChecksum;
            chunksPerPacket = chunksPerPacket > 1 ? chunksPerPacket : 1;
            packetSize = chunksPerPacket * chunkSizeWithChecksum + packetHeaderSize;
            buffer.resize(chunkSize);
        }

        void OutputStreamImpl::initAppend(
            std::pair<std::shared_ptr<LocatedBlock>, std::shared_ptr<FileStatus> > &lastBlockWithStatus) {
            FileStatus fileInfo;
            lastBlock = lastBlockWithStatus.first;

            if (lastBlockWithStatus.second) {
                fileInfo = *lastBlockWithStatus.second;
            } else {
                fileInfo = filesystem->getFileStatus(this->path.c_str());
            }

            closed = false;

            try {
                this->blockSize = fileInfo.getBlockSize();
                cursor = fileInfo.getLength();

                if (lastBlock) {
                    isAppend = true;
                    bytesWritten = lastBlock->getNumBytes();
                    int64_t usedInLastBlock = fileInfo.getLength() % blockSize;
                    int64_t freeInLastBlock = blockSize - usedInLastBlock;

                    if (freeInLastBlock == this->blockSize) {
                        THROW(HdfsIOException,
                              "OutputStreamImpl: the last block for file %s is full.",
                              this->path.c_str());
                    }

                    int usedInCksum = cursor % chunkSize;
                    int freeInCksum = chunkSize - usedInCksum;

                    if (usedInCksum > 0 && freeInCksum > 0) {
                        /*
                         * if there is space in the last partial chunk, then
                         * setup in such a way that the next packet will have only
                         * one chunk that fills up the partial chunk.
                         */
                        packetSize = 0;
                        chunkSize = freeInCksum;
                    } else {
                        /*
                         * if the remaining space in the block is smaller than
                         * that expected size of of a packet, then create
                         * smaller size packet.
                         */
                        packetSize =
                                packetSize < freeInLastBlock ? packetSize : static_cast<int>(freeInLastBlock);
                    }
                }
            } catch (...) {
                completeFile(false);
                reset();
                throw;
            }

            computePacketChunkSize();
        }

        void OutputStreamImpl::openInternal(std::shared_ptr<FileSystemInter> fs, const char *path,
                                            std::pair<std::shared_ptr<LocatedBlock>, std::shared_ptr<Hdfs::FileStatus> >
                                            &pair,
                                            int flag, const Permission &permission, bool createParent,
                                            int replication, int64_t blockSize, int64_t fileId) {
            filesystem = fs;
            this->path = fs->getStandardPath(path);
            this->fileId = fileId;
            this->replication = replication;
            this->blockSize = blockSize;
            syncBlock = flag & SyncBlock;
            conf = std::shared_ptr<SessionConfig>(new SessionConfig(fs->getConf()));
            VKLOG(290) << turbo::str_format("open file %s for %s", this->path.c_str(),
                                            (flag & Append ? "append" : "write"));
            packets.setMaxSize(conf->getPacketPoolSize());

            if (0 == replication) {
                this->replication = conf->getDefaultReplica();
            } else {
                this->replication = replication;
            }

            if (0 == blockSize) {
                this->blockSize = conf->getDefaultBlockSize();
            } else {
                this->blockSize = blockSize;
            }

            chunkSize = conf->getDefaultChunkSize();
            packetSize = conf->getDefaultPacketSize();
            heartBeatInterval = conf->getHeartBeatInterval();
            closeTimeout = conf->getCloseFileTimeout();

            if (packetSize < chunkSize) {
                THROW(InvalidParameter,
                      "OutputStreamImpl: packet size %d is less than the chunk size %d.",
                      packetSize, chunkSize);
            }

            if (0 != this->blockSize % chunkSize) {
                THROW(InvalidParameter,
                      "OutputStreamImpl: block size %" PRId64 " is not the multiply of chunk size %d.",
                      this->blockSize, chunkSize);
            }
        }

        /**
         * To append data to file.
         * @param buf the data used to append.
         * @param size the data size.
         */
        void OutputStreamImpl::append(const char *buf, int64_t size) {
            VKLOG(280) << turbo::str_format("append file %s size is %" PRId64 ", offset %" PRId64 " next pos %" PRId64,
                                            path.c_str(), size, cursor, size + cursor);

            if (NULL == buf || size < 0) {
                THROW(InvalidParameter, "Invalid parameter.");
            }

            checkStatus();

            try {
                appendInternal(buf, size);
            } catch (...) {
                setError(current_exception());
                throw;
            }
        }

        void OutputStreamImpl::appendInternal(const char *buf, int64_t size) {
            int64_t todo = size;

            while (todo > 0) {
                int batch = buffer.size() - position;
                batch = batch < todo ? batch : static_cast<int>(todo);

                /*
                 * bypass buffer.
                 */
                if (0 == position && todo >= static_cast<int64_t>(buffer.size())) {
                    checksum->update(buf + size - todo, batch);
                    appendChunkToPacket(buf + size - todo, batch);
                    bytesWritten += batch;
                    checksum->reset();
                } else {
                    checksum->update(buf + size - todo, batch);
                    memcpy(buffer.data() + position, buf + size - todo, batch);
                    position += batch;

                    if (position == static_cast<int>(buffer.size())) {
                        appendChunkToPacket(buffer.data(), buffer.size());
                        bytesWritten += buffer.size();
                        checksum->reset();
                        position = 0;
                    }
                }

                todo -= batch;

                if (currentPacket
                    && (currentPacket->isFull() || bytesWritten == blockSize)) {
                    sendPacket(currentPacket);

                    if (isAppend) {
                        isAppend = false;
                        chunkSize = conf->getDefaultChunkSize();
                        packetSize = conf->getDefaultPacketSize();
                        computePacketChunkSize();
                    }

                    if (bytesWritten == blockSize) {
                        closePipeline();
                    }
                }
            }

            cursor += size;
        }

        void OutputStreamImpl::appendChunkToPacket(const char *buf, int size) {
            assert(NULL != buf && size > 0);

            if (!currentPacket) {
                currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten,
                                                  nextSeqNo++, checksumSize);
            }

            currentPacket->addChecksum(checksum->getValue());
            currentPacket->addData(buf, size);
            currentPacket->increaseNumChunks();
        }

        void OutputStreamImpl::sendPacket(std::shared_ptr<Packet> packet) {
            if (!pipeline) {
                setupPipeline();
            }

            pipeline->send(currentPacket);
            currentPacket.reset();
            lastSend = turbo::Time::current_time();
        }

        void OutputStreamImpl::setupPipeline() {
            assert(currentPacket);
#ifdef MOCK
            pipeline = stub->getPipeline();
#else
            pipeline = std::shared_ptr<Pipeline>(new PipelineImpl(isAppend, path.c_str(), *conf, filesystem,
                                                                  CHECKSUM_TYPE_CRC32C, conf->getDefaultChunkSize(),
                                                                  replication,
                                                                  currentPacket->getOffsetInBlock(), packets, lastBlock,
                                                                  fileId));
#endif
            lastSend = turbo::Time::current_time();
            /*
             * start heart beat beat thread
             */
            /*if (heartBeatStop) {
                if (heartBeatSender.joinable()) {
                    heartBeatSender.join();
                }

                heartBeatStop = false;
                heartBeatSender = thread(&OutputStreamImpl::heartBeatSenderRoutine, this);
            }*/
        }

        /**
         * Flush all data in buffer and waiting for ack.
         * Will block until get all acks.
         */
        void OutputStreamImpl::flush() {
            VKLOG(280) << turbo::str_format("flush file %s at offset %" PRId64, path.c_str(), cursor);
            checkStatus();

            try {
                flushInternal(false);
            } catch (...) {
                setError(current_exception());
                throw;
            }
        }

        void OutputStreamImpl::flushInternal(bool needSync) {
            if (lastFlushed == cursor && !needSync) {
                return;
            } else {
                lastFlushed = cursor;
            }

            if (position > 0) {
                appendChunkToPacket(buffer.data(), position);
            }

            /*
             * if the pipeline and currentPacket are both NULL,
             * that means the pipeline has been closed and no more data in buffer/packet.
             * already synced when closing pipeline.
             */
            if (!currentPacket && needSync && pipeline) {
                currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten,
                                                  nextSeqNo++, checksumSize);
            }

            lock_guard<mutex> lock(mut);

            if (currentPacket) {
                currentPacket->setSyncFlag(needSync);
                sendPacket(currentPacket);
            }

            if (pipeline) {
                pipeline->flush();
            }
        }

        /**
         * return the current file length.
         * @return current file length.
         */
        int64_t OutputStreamImpl::tell() {
            checkStatus();
            return cursor;
        }

        /**
         * @ref OutputStream::sync
         */
        void OutputStreamImpl::sync() {
            VKLOG(280) << turbo::str_format("sync file %s at offset %" PRId64, path.c_str(), cursor);
            checkStatus();

            try {
                flushInternal(true);
            } catch (...) {
                setError(current_exception());
                throw;
            }
        }

        void OutputStreamImpl::completeFile(bool throwError) {
            turbo::TimeCost timer;

            while (true) {
                try {
                    bool success;
                    success = filesystem->complete(path, lastBlock.get(), fileId);

                    if (success) {
                        return;
                    }
                } catch (HdfsIOException &e) {
                    if (throwError) {
                        NESTED_THROW(HdfsIOException,
                                     "OutputStreamImpl: failed to complete file %s.",
                                     path.c_str());
                    } else {
                        return;
                    }
                }

                if (closeTimeout > 0) {
                    if (timer.m_elapsed() >= closeTimeout) {
                        if (throwError) {
                            THROW(HdfsIOException,
                                  "OutputStreamImpl: timeout when complete file %s, timeout interval %d ms.",
                                  path.c_str(), closeTimeout);
                        } else {
                            return;
                        }
                    }
                }

                try {
                    sleep_for(std::chrono::milliseconds(400));
                } catch (...) {
                }
            }
        }

        /**
         * close the stream.
         */
        void OutputStreamImpl::closePipeline() {
            lock_guard<mutex> lock(mut);

            if (!pipeline) {
                return;
            }

            if (currentPacket) {
                sendPacket(currentPacket);
            }

            currentPacket = packets.getPacket(packetSize, chunksPerPacket, bytesWritten, nextSeqNo++,
                                              checksumSize);

            if (syncBlock) {
                currentPacket->setSyncFlag(syncBlock);
            }

            lastBlock = pipeline->close(currentPacket);
            assert(lastBlock);
            currentPacket.reset();
            pipeline.reset();
            filesystem->fsync(path);
            bytesWritten = 0;
        }

        void OutputStreamImpl::close() {
            exception_ptr e;

            if (closed) {
                return;
            }

            try {
                //pipeline may be broken
                if (!lastError) {
                    if (lastFlushed != cursor && position > 0) {
                        appendChunkToPacket(buffer.data(), position);
                    }

                    if (lastFlushed != cursor && currentPacket) {
                        sendPacket(currentPacket);
                    }

                    closePipeline();
                    /*heartBeatStop = true;
                     condHeartBeatSender.notify_all();

                     if (heartBeatSender.joinable()) {
                     heartBeatSender.join();
                     }*/
                    completeFile(true);
                }
            } catch (...) {
                e = current_exception();
            }

            LeaseRenewer::GetLeaseRenewer().StopRenew(filesystem);
            VKLOG(280) << turbo::str_format("close file %s for write with length %" PRId64, path.c_str(), cursor);
            reset();

            if (e) {
                rethrow_exception(e);
            }
        }

        void OutputStreamImpl::reset() {
            blockSize = 0;
            bytesWritten = 0;
            checksum->reset();
            chunkSize = 0;
            chunksPerPacket = 0;
            closed = true;
            closeTimeout = 0;
            conf.reset();
            currentPacket.reset();
            cursor = 0;
            filesystem.reset();
            heartBeatInterval = 0;
            isAppend = false;
            lastBlock.reset();
            lastError = exception_ptr();
            lastFlushed = 0;
            nextSeqNo = 0;
            packetSize = 0;
            path.clear();
            pipeline.reset();
            position = 0;
            replication = 0;
            syncBlock = false;
        }

        std::string OutputStreamImpl::toString() {
            if (path.empty()) {
                return std::string("OutputStream for path ") + path;
            } else {
                return std::string("OutputStream (not opened)");
            }
        }

        /*void OutputStreamImpl::heartBeatSenderRoutine() {
            assert(heartBeatStop == false);

            while (!heartBeatStop) {
                try {
                    unique_lock < mutex > lock(mut);
                    condHeartBeatSender.wait_for(lock, milliseconds(1000));

                    try {
                        try {
                            if (pipeline
                                    && ToMilliSeconds(lastSend, steady_clock::now())
                                    >= heartBeatInterval) {
                                pipeline->send(std::shared_ptr < Packet > (new Packet()));
                                lastSend = steady_clock::now();
                            }
                        } catch (...) {
                            NESTED_THROW(Hdfs::HdfsIOException, "Failed to send heart beat, path: %s",
                                         path.c_str());
                        }
                    } catch (...) {
                        lastError = current_exception();
                        throw;
                    }
                } catch (const std::bad_alloc & e) {

                     * keep quiet if we run out of memory, since writing log need memory,
                     * that may cause the process terminated.

                    break;
                } catch (const Hdfs::HdfsException & e) {
                    KLOG(ERROR)<<turbo::str_format( "Heart beat thread exit since %s",
                        GetExceptionDetail(e));
                } catch (const std::exception & e) {
                    KLOG(ERROR)<<turbo::str_format( "Heart beat thread exit since %s",
                        e.what());
                }
            }

            heartBeatStop = true;
        }*/
    }
}
